МОСКОВСКИЙ ГОСУДАРСТВЕННЫЙ УНИВЕРСИТЕТ
имени М.В. Ломоносова
Факультет вычислительной математики и кибернетики








Практикум по курсу
"Распределённые системы"
Алгоритм MPI_Scatter для транспьютерной матрицы
Надежный алгоритм для задачи Sor3D
ОТЧЕТ
о выполненном задании

студента 421 учебной группы факультета ВМК МГУ
Арефьева Вениамина Андреевича
Москва, 2022 г.

Весь код и используемые файлы можно найти тут(кликабельно)

Часть 1. Постановка задачи¶

Часть 1.1 Алгоритм MPI_Scatter для транспьютерной матрицы¶

В транспьютерной матрице размером 5*5, в каждом узле которой находится один процесс, необходимо выполнить операцию рассылки данных (длиной 4 байта) всем процессам от одного (MPI_SCATTER) - процесса с координатами (0,0).

Реализовать программу, моделирующую выполнение операции MPI_SCATTER на транспьютерной матрице при помощи пересылок MPI типа точка-точка.

Получить временную оценку работы алгоритма. Оценить сколько времени потребуется для выполнения операции MPI_SCATTER, если все процессы выдали ее одновременно. Время старта равно 100, время передачи байта равно 1 (Ts=100,Tb=1). Процессорные операции, включая чтение из памяти и запись в память, считаются бесконечно быстрыми.

Часть 1.2 Надежный алгоритм для задачи Sor3D¶

Доработать MPI-программу, реализованную в рамках курса “Суперкомпьютеры и параллельная обработка данных”.

Добавить контрольные точки для продолжения работы программы в случае сбоя. Реализовать один из 3-х сценариев работы после сбоя:

  • продолжить работу программы только на “исправных” процессах
  • вместо процессов, вышедших из строя, создать новые MPI-процессы, которые необходимо использовать для продолжения расчетов
  • при запуске программы на счет сразу запустить некоторое дополнительное количество MPI-процессов, которые использовать в случае сбоя

Часть 2. Описание алгоритма¶

Часть 2.1 Алгоритм MPI_Scatter для транспьютерной матрицы¶

По условию необходимо осуществить передачу данных(4 байта(инт)) от узла (0, 0) до всех остальных узлов. Так же в условии сказано, что матрица размера (5,5), но решение можно обобщить до произвольной квадратной матрицы(размерности 2 и более).

Поскольку передачи в транспьютерной матрице возможны только к соседним узлам, то можно вычислять расстояние между какими-либо ячейками в количестве этих передач. А именно оно равно сумме модулей разности между координатами узлов, и это будет минимальное количество передач, потому что при меньшем количестве передач можно прийти к логичесткому противоречию с условием передачи данных только соседним узлам.

На данной диаграмме можно увидеть схему работы программы.

Она сначала распространяет число между осями с началом в со всеми узлами с первой нулевой координатой. Далее все эти процессы начинают передавать число вниз по цепочке до последнего. И получается, что самая долгая передача до узла с индексами (N-1, N-1). Причёт отличительная особенно алгоритма в том, что фактически никаких синхронизации в нём не нужны, каждый процесс знает от кого он принимает данные и кому он должен их передать, поэтому каждый процесс может завершаться сразу же после выполнения своих функций, не дожидаясь завершения всех остальных процессов.

Время одной передачи вычисляется как время старта + время на передачу байтаколичество байт. В нашем случае это формула имеет 100 + 1 4 = 104 для одной передачи. Несложно вычислить, что количество передач до узла (N-1, N-1) равно N-1 + N-1 = 2*N -2. Если подставить N из условия, то есть 5, то получается ответ 832. А ниже представлен обобщённый график.

In [1]:
import plotly.express as px
In [2]:
def calculate_time(size, ts = 100, tb = 1, b_count = 4):
    return (size-1) * 2 * (ts + tb * b_count)

data = {
    "x":[*range(2, 20)]
}
data["y"] = list(map(calculate_time, data["x"]))

fig = px.line(data, labels= {"x":"Size of the transputer matrix", "y":"MPI_SCATTER elapsed time"}, markers=True,
              x="x", y="y",title="The dependence of the execution time on the size of the transputer matrix")
fig.show()

Часть 2.2. Надежный алгоритм для задачи Sor3D¶

Было принято решение о расставлении контрольных точек и в случае каких-либо сбоев осуществляется возврат к этим контрольным точкам, удаление повреждённых процессов, создание новых и последующая синхронизация. Более подробно алгоритм можно увидеть на данной блоксхеме.

Такой подход позволяет программе спокойно выполниться, независимо от количества сбоев, потому что она может порождать неограниченное каким-то фиксированным числом количество процессов.

Основная часть кода - обработка ошибочного состояния процесса, его выделение из общего коммуникатора, удаление, создание нового процесса и его интеграция в существующий коммуникатор таким образом, чтобы сохранить номер утерянного процесса для того, чтобы не нагружать полезный код заботой о правильном распределении работы между старыми и новыми процессами.

При возникновении какой-либо ошибки MPI директивы об это уведомляются все процессы группы(основного коммуникатора) и они сразу откатываются к контрольной точке.

Механизм возникновения ошибок реализован через генератор случайных чисел. Каждый процесс на каждой итерации алгоритма с рансом 1/100 может выйти из строя и завершиться аварийно.(кроме 0 процесса, потому что он является ведущим). Если происходит данная ситуациия, то весь остальной код узнает об этом только тогда, когда прочитает сообщение статуса MPI директивы, а после этого уведомит все процессы группы. После этого последует полный откат всех вычислений и восстановление предыдущей итерации основного цикла.

Часть 3. Способы запуска программы¶

Часть 3.1 Алгоритм MPI_Scatter для транспьютерной матрицы¶

Можно запускать прилагаемый код c помощью bash script файла start.sh. С помощью команды . start.sh <количество процессов>

Содержимое файла start.sh:

mpicc main.c -o test -lm
mpiexec -n $1 ./test

Часть 3.2. Надежный алгоритм для задачи Sor3D¶

Для начала нужно запустить docker container. Это можно сделать c помощью прилагаемого bash script файла start_docker.sh. С помощью команды . start_docker.sh

Содержимое файла start_docker.sh:

ulfm_image=abouteiller/mpi-ft-ulfm
docker pull $ulfm_image

Можно запускать прилагаемый файл start.sh c помощью bash script файла start. С помощью команды . start.sh <количество процессов>

Содержимое файла start.sh:

function mpirun {
    docker run --user $(id -u):$(id -g) --cap-drop=all --security-opt label:disabled -v $PWD:/sandbox $ulfm_image mpirun --map-by :oversubscribe --mca btl tcp,self $@
}
function mpicc {
    docker run --user $(id -u):$(id -g) --cap-drop=all --security-opt label:disabled -v $PWD:/sandbox $ulfm_image mpicc $@
}

mpicc mpi_sor_3d_upgraded.c -o test
mpirun --with-ft ulfm -n $1 ./test

Часть 4. Примеры запуска и вывод программы¶

Часть 4.1 Алгоритм MPI_Scatter для транспьютерной матрицы¶

Часть 4.2. Надежный алгоритм для задачи Sor3D¶

Часть 5. Выводы¶

Был реализован алгоритм MPI_Scatter с использованием пересылок типа точка-точка и средств MPI. Также была выведена теоретическая временная оценка предложенного алгоритма.

Была реализована надёжная версия алгоримта Sor_3D, которая устойчива к смерти условно бесконечного числа процессов. Этого удалось достичь, благодаря реализации метода создания асинхронных контрольных точек и восстановления из контрольных точек утерянных процессов.

Часть 6. Приложения¶

Часть 6.1 Алгоритм MPI_Scatter для транспьютерной матрицы¶

main.c:

#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <mpi.h>


MPI_Request send_int(int x, int y, MPI_Comm comm, int *data) {
    int target_rank;
    int coords[2] = {x, y};
    MPI_Cart_rank(comm, coords, &target_rank);

    MPI_Request request;
    MPI_Isend(data, 1, MPI_INT, target_rank,
              0, MPI_COMM_WORLD, &request);
    return request;
}

int receive_int(int x, int y, MPI_Comm comm) {
    int target_rank;
    int coords[2] = {x, y};
    MPI_Cart_rank(comm, coords, &target_rank);

    int data;
    MPI_Recv(&data, 1, MPI_INT, target_rank,
             MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    return data;
}

int main(int argc, char **argv) {
    int debug = 1;
    int size = 3;

    MPI_Init(&argc, &argv);

    MPI_Comm_size(MPI_COMM_WORLD, &size);

    size = (int) sqrt((double) size);

    int rank, r_size;

    int ndims = 2;

    int n_size[2] = {size, size};
    int periodic[2] = {0, 0};

    MPI_Comm cart_comm;

    MPI_Cart_create(MPI_COMM_WORLD, ndims, n_size, periodic, 1, &cart_comm);

    MPI_Comm_rank(cart_comm, &rank);
    MPI_Comm_size(cart_comm, &r_size);

    int mpi_map[size][size];
    for (int i = 0; i < size * size; i++) {
        int coords[2];
        MPI_Cart_coords(cart_comm, i, ndims, coords);
        mpi_map[coords[0]][coords[1]] = i;
    }

    srand(r_size + size);

    int coords[2];
    MPI_Cart_coords(cart_comm, rank, ndims, coords);
//    printf("My id = %2d, my coords is <%2d, %2d>\n", rank, coords[0], coords[1]);


    int number;
    if (coords[0] == 0) {
        if (coords[1] == 0) {
            number = rand();
        } else {
            number = receive_int(coords[0], coords[1] - 1, cart_comm);
            printf("Got number, my coords is <%2d, %2d>\n", coords[0], coords[1]);
        }

        MPI_Request r1, r2;
        if (coords[1] + 1 != size) {
            r1 = send_int(coords[0], coords[1] + 1, cart_comm, &number);
        }
        r2 = send_int(coords[0] + 1, coords[1], cart_comm, &number);
        if (coords[1] + 1 != size) {
            MPI_Wait(&r1, MPI_STATUS_IGNORE);
        }
        MPI_Wait(&r2, MPI_STATUS_IGNORE);
    } else {
        number = receive_int(coords[0] - 1, coords[1], cart_comm);
        printf("Got number, my coords is <%2d, %2d>\n", coords[0], coords[1]);
        MPI_Request r1;
        if (coords[0] + 1 != size) {
            r1 = send_int(coords[0] + 1, coords[1], cart_comm, &number);
            MPI_Wait(&r1, MPI_STATUS_IGNORE);
        }
    }

    MPI_Finalize();
    return 0;
}

Часть 6.2. Надежный алгоритм для задачи Sor3D¶

mpi_sor_3d_upgraded.c:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <math.h>
#include <mpi.h>
#include <mpi-ext.h>
#include <time.h>


int rank = MPI_PROC_NULL, verbose = 1; /* makes this global (for printfs) */
char **gargv;

int MPIX_Comm_replace(MPI_Comm comm, MPI_Comm *newcomm) {
    MPI_Comm icomm, /* the intercomm between the spawnees and the old (shrinked) world */
    scomm, /* the local comm for each sides of icomm */
    mcomm; /* the intracomm, merged from icomm */
    MPI_Group cgrp, sgrp, dgrp;
    int rc, flag, rflag, i, nc, ns, nd, crank, srank, drank;

    redo:
    if (comm == MPI_COMM_NULL) { /* am I a new process? */
        /* I am a new spawnee, waiting for my new rank assignment
         * it will be sent by rank 0 in the old world */
        MPI_Comm_get_parent(&icomm);
        scomm = MPI_COMM_WORLD;
        MPI_Recv(&crank, 1, MPI_INT, 0, 1, icomm, MPI_STATUS_IGNORE);
        if (verbose) {
            MPI_Comm_rank(scomm, &srank);
            printf("Spawnee %d: crank=%d\n", srank, crank);
        }
    } else {
        /* I am a survivor: Spawn the appropriate number
         * of replacement processes (we check that this operation worked
         * before we process further) */
        /* First: remove dead processes */
        MPIX_Comm_shrink(comm, &scomm);
        MPI_Comm_size(scomm, &ns);
        MPI_Comm_size(comm, &nc);
        nd = nc - ns; /* number of deads */
        if (0 == nd) {
            /* Nobody was dead to start with. We are done here */
            MPI_Comm_free(&scomm);
            *newcomm = comm;
            return MPI_SUCCESS;
        }
        /* We handle failures during this function ourselves... */
        MPI_Comm_set_errhandler(scomm, MPI_ERRORS_RETURN);

        rc = MPI_Comm_spawn(gargv[0], &gargv[1], nd, MPI_INFO_NULL,
                            0, scomm, &icomm, MPI_ERRCODES_IGNORE);
        flag = (MPI_SUCCESS == rc);
        MPIX_Comm_agree(scomm, &flag);
        if (!flag) {
            if (MPI_SUCCESS == rc) {
                MPIX_Comm_revoke(icomm);
                MPI_Comm_free(&icomm);
            }
            MPI_Comm_free(&scomm);
            if (verbose) fprintf(stderr, "%04d: comm_spawn failed, redo\n", rank);
            goto redo;
        }

        /* remembering the former rank: we will reassign the same
         * ranks in the new world. */
        MPI_Comm_rank(comm, &crank);
        MPI_Comm_rank(scomm, &srank);
        /* the rank 0 in the scomm comm is going to determine the
         * ranks at which the spares need to be inserted. */
        if (0 == srank) {
            /* getting the group of dead processes:
             *   those in comm, but not in scomm are the deads */
            MPI_Comm_group(comm, &cgrp);
            MPI_Comm_group(scomm, &sgrp);
            MPI_Group_difference(cgrp, sgrp, &dgrp);
            /* Computing the rank assignment for the newly inserted spares */
            for (i = 0; i < nd; i++) {
                MPI_Group_translate_ranks(dgrp, 1, &i, cgrp, &drank);
                /* sending their new assignment to all new procs */
                MPI_Send(&drank, 1, MPI_INT, i, 1, icomm);
            }
            MPI_Group_free(&cgrp);
            MPI_Group_free(&sgrp);
            MPI_Group_free(&dgrp);
        }
    }

    /* Merge the intercomm, to reconstruct an intracomm (we check
     * that this operation worked before we proceed further) */
    rc = MPI_Intercomm_merge(icomm, 1, &mcomm);
    rflag = flag = (MPI_SUCCESS == rc);
    MPIX_Comm_agree(scomm, &flag);
    if (MPI_COMM_WORLD != scomm) MPI_Comm_free(&scomm);
    MPIX_Comm_agree(icomm, &rflag);
    MPI_Comm_free(&icomm);
    if (!(flag && rflag)) {
        if (MPI_SUCCESS == rc) {
            MPI_Comm_free(&mcomm);
        }
        if (verbose) fprintf(stderr, "%04d: Intercomm_merge failed, redo\n", rank);
        goto redo;
    }

    /* Now, reorder mcomm according to original rank ordering in comm
     * Split does the magic: removing spare processes and reordering ranks
     * so that all surviving processes remain at their former place */
    rc = MPI_Comm_split(mcomm, 1, crank, newcomm);

    /* Split or some of the communications above may have failed if
     * new failures have disrupted the process: we need to
     * make sure we succeeded at all ranks, or retry until it works. */
    flag = (MPI_SUCCESS == rc);
    MPIX_Comm_agree(mcomm, &flag);
    MPI_Comm_free(&mcomm);
    if (!flag) {
        if (MPI_SUCCESS == rc) {
            MPI_Comm_free(newcomm);
        }
        if (verbose) fprintf(stderr, "%04d: comm_split failed, redo\n", rank);
        goto redo;
    }

    /* restore the error handler */
    if (MPI_COMM_NULL != comm) {
        MPI_Errhandler errh;
        MPI_Comm_get_errhandler(comm, &errh);
        MPI_Comm_set_errhandler(*newcomm, errh);
    }

    return MPI_SUCCESS;
}


#define  Max(a, b) ((a)>(b)?(a):(b))
#define  N   ((2<<5)+2)

int slider = 0;
double maxeps = 0.1e-7;
double A[2][N][N][N];
double receive_buf[N][N][N];

void init();

void verify();

int main(int argc, char **argv) {
    MPI_Comm world; /* a world comm for the work, w/o the spares */
    MPI_Comm rworld; /* and a temporary handle to store the repaired copy */
    int size, spare;
    int ret_code; /* error code from MPI functions */
    char estr[MPI_MAX_ERROR_STRING] = "";
    int strl; /* error messages */
    double start, tff = 0, twf = 0; /* timings */
    int was_error;

    gargv = argv;
    MPI_Init(&argc, &argv);


    /* Am I a spare ? */
    MPI_Comm_get_parent(&world);
    if (MPI_COMM_NULL == world) {
        /* First run: Let's create an initial world,
         * a copy of MPI_COMM_WORLD */
        MPI_Comm_dup(MPI_COMM_WORLD, &world);
        MPI_Comm_size(world, &size);
        MPI_Comm_rank(world, &rank);
        /* We set an errhandler on world, so that a failure is not fatal anymore. */
        MPI_Comm_set_errhandler(world, MPI_ERRORS_RETURN);
    } else {
        /* I am a spare, lets get the repaired world */
        MPIX_Comm_replace(MPI_COMM_NULL, &world);
        MPI_Comm_size(world, &size);
        MPI_Comm_rank(world, &rank);
        /* We set an errhandler on world, so that a failure is not fatal anymore. */
        MPI_Comm_set_errhandler(world, MPI_ERRORS_RETURN);
        //without goto
    }

    double time_start = MPI_Wtime();

    int line_start[size];
    int line_count[size];

    int n2 = N * N;

    for (int i = 0; i < size; i++) {
        line_count[i] = n2 / size;
    }

    for (int i = 0; i < n2 - (n2 / size) * size; i++) {
        line_count[i]++;
    }

    line_start[0] = 0;
    for (int i = 1; i < size; i++) {
        line_start[i] = line_start[i - 1] + line_count[i - 1];
    }

    if (rank == -1) { // just debug information
        for (int i = 0; i < size; i++) {
            printf("%5d", line_start[i]);
        }
        printf("\n");

        for (int i = 0; i < size; i++) {
            printf("%5d", line_count[i]);
        }
        printf("\n");
    }

    int receive_counts[size];
    int start_offset[size];
    for (int i = 0; i < size; i++) {
        receive_counts[i] = line_count[i] * N;
        start_offset[i] = line_start[i] * N;
    }

    if (rank == 0) { //init only in main process
        init();
    }

    int itmax = 100;

    srand(time(NULL));

    for (int iteration = 1; iteration < itmax; ++iteration) {
        MPI_Bcast(&iteration, 1, MPI_INT, 0, world);

        if ((rand() % 100 == 1) && rank != 0) {
            printf("Rank %04d: committing suicide\n", rank);
            raise(SIGKILL);
        }

        ret_code = MPI_Bcast(A[slider], N * N * N, MPI_DOUBLE, 0, world);;

        was_error = ret_code != MPI_SUCCESS;
        MPI_Allreduce(&was_error, &was_error, 1, MPI_INT, MPI_MAX, world);

        if (was_error) {
            printf("Bcast failed, restarting... %d\n", iteration);
            MPIX_Comm_replace(world, &rworld);
            MPI_Comm_free(&world);
            world = rworld;
            --iteration;
            continue;
        }

        double eps = 0.;
        //relax;
        int k;
        for (int iterat = line_start[rank]; iterat < line_start[rank] + line_count[rank]; iterat++) {
            int i_cof = iterat / N;
            int j_cof = iterat % N;
            if (i_cof >= 1 && i_cof <= (N - 2) && j_cof >= 1 && j_cof <= (N - 2)) {
                for (k = 1; k <= N - 2; k++) {
                    double e = A[slider][i_cof][j_cof][k];
                    A[slider ^ 1][i_cof][j_cof][k] = (A[slider][i_cof - 1][j_cof][k] + A[slider][i_cof + 1][j_cof][k]
                                                      + A[slider][i_cof][j_cof - 1][k] + A[slider][i_cof][j_cof + 1][k]
                                                      + A[slider][i_cof][j_cof][k - 1] +
                                                      A[slider][i_cof][j_cof][k + 1]) / 6.;
                    eps = Max(eps, fabs(e - A[slider ^ 1][i_cof][j_cof][k]));
                }
            }
        }

        slider ^= 1;

        ret_code = MPI_Gatherv(A[slider][line_start[rank] / N][line_start[rank] % N], line_count[rank] * N, MPI_DOUBLE,
                               receive_buf[line_start[rank] / N][line_start[rank] % N], receive_counts, start_offset,
                               MPI_DOUBLE, 0, world);

        slider ^= 1;

        memcpy(A[slider], receive_buf, sizeof(double) * N * N * N);

        was_error = ret_code != MPI_SUCCESS;
        MPI_Allreduce(&was_error, &was_error, 1, MPI_INT, MPI_MAX, world);

        if (was_error) {
            printf("Gatherv failed, restarting... %d\n", iteration);
            MPIX_Comm_replace(world, &rworld);
            MPI_Comm_free(&world);
            world = rworld;
            --iteration;
            continue;
        }

        if (rank == 0) {  // just debug information
            printf("it=%4i slider = %3d \n", iteration, slider);
            verify();
        }
    }


    MPI_Comm_free(&world);

    MPI_Finalize();
    return EXIT_SUCCESS;
}

void init() {
    int i, j, k;
    for (i = 0; i <= N - 1; i++) {
        for (j = 0; j <= N - 1; j++) {
            for (k = 0; k <= N - 1; k++) {
                if (i == 0 || i == N - 1 || j == 0 || j == N - 1 || k == 0 || k == N - 1) {
                    A[slider][i][j][k] = 0.;
                } else {
                    A[slider][i][j][k] = (4. + i + j + k);
                }
            }
        }
    }
}

void verify() {
    double s = 0.;
    int i, j, k;
    for (i = 0; i <= N - 1; i++) {
        for (j = 0; j <= N - 1; j++) {
            for (k = 0; k <= N - 1; k++) {
                s = s + A[slider][i][j][k] * (i + 1) * (j + 1) * (k + 1) / (N * N * N);
            }
        }
    }
}
In [ ]: